package edu.ucla.cs.rpc.multicast.network;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import edu.ucla.cs.rpc.multicast.Dest;
import edu.ucla.cs.rpc.multicast.handlers.MessageHandler;
import edu.ucla.cs.rpc.multicast.network.message.Message;
import edu.ucla.cs.rpc.multicast.network.message.Message.MessageType;
import edu.ucla.cs.rpc.multicast.util.Network;

/**
 * MulticastMessageReceiver is a MessageReceiver that registers itself with a
 * MulticastManager. Any messages sent to the multicast group will be received
 * by this class. Furthermore, the MulticastManager keeps instances of
 * MulticastMessageReceiver updated with the current group membership list (both
 * Senders and Dests). This allows certain algorithms that rely on knowing group
 * membership to work properly.
 * 
 * @author Chase Covello Philip Russell
 * 
 */
public class MulticastMessageReceiver extends MessageReceiver implements Dest {

	/**
	 * This message handler receives join/leave messages sent by the multicast
	 * manager to keep its local representation of the state of the multicast
	 * group updated. All other messages are passed to the given message
	 * handler.
	 * 
	 * @author Chase Covello Philip Russell
	 * 
	 */
	private final class MulticastMessageReceiverMessageHandler implements
			MessageHandler {

		private final MessageHandler handler;

		/**
		 * Construct a new MulticastMessageReceiverMessageHandler with the given
		 * message handler.
		 * 
		 * @param handler
		 *            the message handler.
		 */
		public MulticastMessageReceiverMessageHandler(MessageHandler handler) {
			this.handler = handler;
		}

		/**
		 * This method grabs join/leave messages and uses them to update the
		 * receiver's state. All other messages are passed through to the
		 * handler.
		 */
		public void receive(Message message) {
			MessageType type = message.getType();
			
			if (type == MessageType.JOIN_RECEIVERS) {
				synchronized (receivers) {
					receivers.add(message.getSenderID());
				}
			} else if (type == MessageType.LEAVE_RECEIVERS) {
				synchronized (receivers) {
					receivers.remove(message.getSenderID());
				}
			} else if (type == MessageType.JOIN_SENDERS) {
				synchronized (senders) {
					senders.add(message.getSenderID());
				}
			} else if (type == MessageType.LEAVE_SENDERS) {
				synchronized (senders) {
					senders.remove(message.getSenderID());
				}
			} else {
				handler.receive(message);
			}
		}

	}

	private final SocketAddress multicastManager;

	private final long nodeID;

	private final Set<Long> receivers;

	private final Set<Long> senders;

	/**
	 * Construct a new MulticastMessageReceiver with the given multicast manager
	 * and message handler. Incoming messages are passed to the message handler.
	 * 
	 * @param multicastManager
	 *            the multicast manager.
	 * @param handler
	 *            the message handler to use.
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	public MulticastMessageReceiver(SocketAddress multicastManager,
			MessageHandler handler) throws IOException {
		super(handler);

		// we need this handler to "filter" group membership messages and act
		// upon them
		setMessageHandler(new MulticastMessageReceiverMessageHandler(handler));

		this.multicastManager = multicastManager;
		receivers = new HashSet<Long>();
		senders = new HashSet<Long>();

		// random 64 bit node ID; should be unique with probability of 2^-32
		nodeID = Network.getRandomLong();

		// join the multicast group
		Message join = new Message(MessageType.JOIN_RECEIVERS,
				getSocketAddress());
		join.setSenderID(nodeID);
		join.send(multicastManager);
	}

	/**
	 * Returns the ID of this MulticastMessageReceiver.
	 * 
	 * @return the node ID.
	 */
	public long getNodeID() {
		return nodeID;
	}

	/**
	 * Returns a read-only Set of all the multicast message receiver IDs
	 * currently registered with the multicast manager.
	 * 
	 * @return the set of receiver IDs.
	 */
	public Set<Long> getReceivers() {
		return Collections.unmodifiableSet(receivers);
	}

	/**
	 * Returns a read-only Set of all multicast message sender IDs currently
	 * registered with the multicast manager.
	 * 
	 * @return the set of sender IDs.
	 */
	public Set<Long> getSenders() {
		return Collections.unmodifiableSet(senders);
	}

	/**
	 * Shuts down any threads created by this message receiver. This method
	 * <b>must</b> be called before disposing of all references to this object;
	 * otherwise, its thread will continue to run until the application is
	 * forcibly terminated.
	 */
	public void shutdown() {
		try {
			// leave the multicast group
			Message leave = new Message(MessageType.LEAVE_RECEIVERS,
					getSocketAddress());
			leave.setSenderID(nodeID);
			leave.send(multicastManager);
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			super.shutdown();
		}
	}
}
